package http

import (
	"compress/gzip"
	"context"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"

	"github.com/influxdata/httprouter"
	"github.com/influxdata/influxdb/v2"
	pcontext "github.com/influxdata/influxdb/v2/context"
	"github.com/influxdata/influxdb/v2/http/metric"
	kitio "github.com/influxdata/influxdb/v2/kit/io"
	"github.com/influxdata/influxdb/v2/kit/tracing"
	kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
	"github.com/influxdata/influxdb/v2/models"
	"github.com/influxdata/influxdb/v2/storage"
	"github.com/influxdata/influxdb/v2/tsdb"
	"github.com/opentracing/opentracing-go"
	"go.uber.org/zap"
	"istio.io/pkg/log"
)

var (
	// ErrMaxBatchSizeExceeded is returned when a points batch exceeds
	// the defined upper limit in bytes. This pertains to the size of the
	// batch after inflation from any compression (i.e. ungzipped).
	ErrMaxBatchSizeExceeded = errors.New("points batch is too large")
)

// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
	influxdb.HTTPErrorHandler
	log                *zap.Logger
	WriteEventRecorder metric.EventRecorder

	PointsWriter        storage.PointsWriter
	BucketService       influxdb.BucketService
	OrganizationService influxdb.OrganizationService
}

// NewWriteBackend returns a new instance of WriteBackend.
func NewWriteBackend(log *zap.Logger, b *APIBackend) *WriteBackend {
	return &WriteBackend{
		HTTPErrorHandler:   b.HTTPErrorHandler,
		log:                log,
		WriteEventRecorder: b.WriteEventRecorder,

		PointsWriter:        b.PointsWriter,
		BucketService:       b.BucketService,
		OrganizationService: b.OrganizationService,
	}
}

// WriteHandler receives line protocol and sends to a publish function.
type WriteHandler struct {
	influxdb.HTTPErrorHandler
	BucketService       influxdb.BucketService
	OrganizationService influxdb.OrganizationService
	PointsWriter        storage.PointsWriter
	EventRecorder       metric.EventRecorder

	router            *httprouter.Router
	log               *zap.Logger
	maxBatchSizeBytes int64
	parserOptions     []models.ParserOption
}

// WriteHandlerOption is a functional option for a *WriteHandler
type WriteHandlerOption func(*WriteHandler)

// WithMaxBatchSizeBytes configures the maximum size for a
// (decompressed) points batch allowed by the write handler
func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
	return func(w *WriteHandler) {
		w.maxBatchSizeBytes = n
	}
}

func WithParserOptions(opts ...models.ParserOption) WriteHandlerOption {
	return func(w *WriteHandler) {
		w.parserOptions = opts
	}
}

// Prefix provides the route prefix.
func (*WriteHandler) Prefix() string {
	return prefixWrite
}

const (
	prefixWrite              = "/api/v2/write"
	msgInvalidGzipHeader     = "gzipped HTTP body contains an invalid header"
	msgInvalidPrecision      = "invalid precision; valid precision units are ns, us, ms, and s"
	msgUnableToReadData      = "unable to read data"
	msgWritingRequiresPoints = "writing requires points"
	msgUnexpectedWriteError  = "unexpected error writing points to database"

	opPointsWriter = "http/pointsWriter"
	opWriteHandler = "http/writeHandler"
)

// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
	h := &WriteHandler{
		HTTPErrorHandler:    b.HTTPErrorHandler,
		PointsWriter:        b.PointsWriter,
		BucketService:       b.BucketService,
		OrganizationService: b.OrganizationService,
		EventRecorder:       b.WriteEventRecorder,

		router: NewRouter(b.HTTPErrorHandler),
		log:    log,
	}

	for _, opt := range opts {
		opt(h)
	}

	h.router.HandlerFunc(http.MethodPost, prefixWrite, h.handleWrite)
	return h
}

func (h *WriteHandler) findBucket(ctx context.Context, orgID influxdb.ID, bucket string) (*influxdb.Bucket, error) {
	if id, err := influxdb.IDFromString(bucket); err == nil {
		b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
			OrganizationID: &orgID,
			ID:             id,
		})
		if err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
			return nil, err
		} else if err == nil {
			return b, err
		}
	}

	return h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
		OrganizationID: &orgID,
		Name:           &bucket,
	})
}

func (h *WriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	h.router.ServeHTTP(w, r)
}

func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
	span, r := tracing.ExtractFromHTTPRequest(r, "WriteHandler")
	defer span.Finish()

	ctx := r.Context()
	auth, err := pcontext.GetAuthorizer(ctx)
	if err != nil {
		h.HandleHTTPError(ctx, err, w)
		return
	}

	req, err := decodeWriteRequest(ctx, r, h.maxBatchSizeBytes)
	if err != nil {
		h.HandleHTTPError(ctx, err, w)
		return
	}

	org, err := queryOrganization(ctx, r, h.OrganizationService)
	if err != nil {
		h.HandleHTTPError(ctx, err, w)
		return
	}
	span.LogKV("org_id", org.ID)

	sw := kithttp.NewStatusResponseWriter(w)
	recorder := NewWriteUsageRecorder(sw, h.EventRecorder)
	var requestBytes int
	defer func() {
		// Close around the requestBytes variable to placate the linter.
		recorder.Record(ctx, requestBytes, org.ID, r.URL.Path)
	}()

	bucket, err := h.findBucket(ctx, org.ID, req.Bucket)
	if err != nil {
		h.HandleHTTPError(ctx, err, sw)
		return
	}
	span.LogKV("bucket_id", bucket.ID)

	if err := checkBucketWritePermissions(auth, org.ID, bucket.ID); err != nil {
		h.HandleHTTPError(ctx, err, sw)
		return
	}

	opts := append([]models.ParserOption{}, h.parserOptions...)
	opts = append(opts, models.WithParserPrecision(req.Precision))
	parsed, err := NewPointsParser(opts...).ParsePoints(ctx, org.ID, bucket.ID, req.Body)
	if err != nil {
		h.HandleHTTPError(ctx, err, sw)
		return
	}
	requestBytes = parsed.RawSize

	if err := h.PointsWriter.WritePoints(ctx, parsed.Points); err != nil {
		h.HandleHTTPError(ctx, &influxdb.Error{
			Code: influxdb.EInternal,
			Op:   opWriteHandler,
			Msg:  "unexpected error writing points to database",
			Err:  err,
		}, sw)
		return
	}

	sw.WriteHeader(http.StatusNoContent)
}

// checkBucketWritePermissions checks an Authorizer for write permissions to a
// specific Bucket.
func checkBucketWritePermissions(auth influxdb.Authorizer, orgID, bucketID influxdb.ID) error {
	p, err := influxdb.NewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID)
	if err != nil {
		return &influxdb.Error{
			Code: influxdb.EInternal,
			Op:   opWriteHandler,
			Msg:  fmt.Sprintf("unable to create permission for bucket: %v", err),
			Err:  err,
		}
	}
	if pset, err := auth.PermissionSet(); err != nil || !pset.Allowed(*p) {
		return &influxdb.Error{
			Code: influxdb.EForbidden,
			Op:   opWriteHandler,
			Msg:  "insufficient permissions for write",
			Err:  err,
		}
	}
	return nil
}

// PointBatchReadCloser (potentially) wraps an io.ReadCloser in Gzip
// decompression and limits the reading to a specific number of bytes.
func PointBatchReadCloser(rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (io.ReadCloser, error) {
	switch encoding {
	case "gzip", "x-gzip":
		var err error
		rc, err = gzip.NewReader(rc)
		if err != nil {
			return nil, err
		}
	}
	if maxBatchSizeBytes > 0 {
		rc = kitio.NewLimitedReadCloser(rc, maxBatchSizeBytes)
	}
	return rc, nil
}

// NewPointsParser returns a new PointsParser
func NewPointsParser(parserOptions ...models.ParserOption) *PointsParser {
	return &PointsParser{
		ParserOptions: parserOptions,
	}
}

// ParsedPoints contains the points parsed as well as the total number of bytes
// after decompression.
type ParsedPoints struct {
	Points  models.Points
	RawSize int
}

// PointsParser parses batches of Points.
type PointsParser struct {
	ParserOptions []models.ParserOption
}

// ParsePoints parses the points from an io.ReadCloser for a specific Bucket.
func (pw *PointsParser) ParsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) {
	span, ctx := opentracing.StartSpanFromContext(ctx, "write points")
	defer span.Finish()
	return pw.parsePoints(ctx, orgID, bucketID, rc)
}

func (pw *PointsParser) parsePoints(ctx context.Context, orgID, bucketID influxdb.ID, rc io.ReadCloser) (*ParsedPoints, error) {
	data, err := readAll(ctx, rc)
	if err != nil {
		code := influxdb.EInternal
		if errors.Is(err, ErrMaxBatchSizeExceeded) {
			code = influxdb.ETooLarge
		} else if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) {
			code = influxdb.EInvalid
		}
		return nil, &influxdb.Error{
			Code: code,
			Op:   opPointsWriter,
			Msg:  msgUnableToReadData,
			Err:  err,
		}
	}

	requestBytes := len(data)
	if requestBytes == 0 {
		return nil, &influxdb.Error{
			Op:   opPointsWriter,
			Code: influxdb.EInvalid,
			Msg:  msgWritingRequiresPoints,
		}
	}

	span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing")
	encoded := tsdb.EncodeName(orgID, bucketID)
	mm := models.EscapeMeasurement(encoded[:])

	points, err := models.ParsePointsWithOptions(data, mm, pw.ParserOptions...)
	span.LogKV("values_total", len(points))
	span.Finish()
	if err != nil {
		log.Error("Error parsing points", zap.Error(err))

		code := influxdb.EInvalid
		if errors.Is(err, models.ErrLimitMaxBytesExceeded) ||
			errors.Is(err, models.ErrLimitMaxLinesExceeded) ||
			errors.Is(err, models.ErrLimitMaxValuesExceeded) {
			code = influxdb.ETooLarge
		}

		return nil, &influxdb.Error{
			Code: code,
			Op:   opPointsWriter,
			Msg:  "",
			Err:  err,
		}
	}

	return &ParsedPoints{
		Points:  points,
		RawSize: requestBytes,
	}, nil
}

func readAll(ctx context.Context, rc io.ReadCloser) (data []byte, err error) {
	defer func() {
		if cerr := rc.Close(); cerr != nil && err == nil {
			if errors.Is(cerr, kitio.ErrReadLimitExceeded) {
				cerr = ErrMaxBatchSizeExceeded
			}
			err = cerr
		}
	}()

	span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "read request body")

	defer func() {
		span.LogKV("request_bytes", len(data))
		span.Finish()
	}()

	data, err = ioutil.ReadAll(rc)
	if err != nil {
		return nil, err

	}
	return data, nil
}

// writeRequest is a request object holding information about a batch of points
// to be written to a Bucket.
type writeRequest struct {
	Org       string
	Bucket    string
	Precision string
	Body      io.ReadCloser
}

// decodeWriteRequest extracts information from an http.Request object to
// produce a writeRequest.
func decodeWriteRequest(ctx context.Context, r *http.Request, maxBatchSizeBytes int64) (*writeRequest, error) {
	qp := r.URL.Query()
	precision := qp.Get("precision")
	if precision == "" {
		precision = "ns"
	}

	if !models.ValidPrecision(precision) {
		return nil, &influxdb.Error{
			Code: influxdb.EInvalid,
			Op:   "http/newWriteRequest",
			Msg:  msgInvalidPrecision,
		}
	}

	bucket := qp.Get("bucket")
	if bucket == "" {
		return nil, &influxdb.Error{
			Code: influxdb.ENotFound,
			Op:   "http/newWriteRequest",
			Msg:  "bucket not found",
		}
	}

	encoding := r.Header.Get("Content-Encoding")
	body, err := PointBatchReadCloser(r.Body, encoding, maxBatchSizeBytes)
	if err != nil {
		return nil, err
	}

	return &writeRequest{
		Bucket:    qp.Get("bucket"),
		Org:       qp.Get("org"),
		Precision: precision,
		Body:      body,
	}, nil
}

// WriteService sends data over HTTP to influxdb via line protocol.
type WriteService struct {
	Addr               string
	Token              string
	Precision          string
	InsecureSkipVerify bool
}

var _ influxdb.WriteService = (*WriteService)(nil)

func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error {
	precision := s.Precision
	if precision == "" {
		precision = "ns"
	}

	if !models.ValidPrecision(precision) {
		return &influxdb.Error{
			Code: influxdb.EInvalid,
			Op:   "http/Write",
			Msg:  msgInvalidPrecision,
		}
	}

	u, err := NewURL(s.Addr, prefixWrite)
	if err != nil {
		return err
	}

	r, err = compressWithGzip(r)
	if err != nil {
		return err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
	if err != nil {
		return err
	}

	req.Header.Set("Content-Type", "text/plain; charset=utf-8")
	req.Header.Set("Content-Encoding", "gzip")
	SetToken(s.Token, req)

	org, err := orgID.Encode()
	if err != nil {
		return err
	}

	bucket, err := bucketID.Encode()
	if err != nil {
		return err
	}

	params := req.URL.Query()
	params.Set("org", string(org))
	params.Set("bucket", string(bucket))
	params.Set("precision", string(precision))
	req.URL.RawQuery = params.Encode()

	hc := NewClient(u.Scheme, s.InsecureSkipVerify)

	resp, err := hc.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	return CheckError(resp)
}

func compressWithGzip(data io.Reader) (io.Reader, error) {
	pr, pw := io.Pipe()
	gw := gzip.NewWriter(pw)
	var err error

	go func() {
		_, err = io.Copy(gw, data)
		gw.Close()
		pw.Close()
	}()

	return pr, err
}
